<?php

class Job
{
    protected $workerNum = 4;
    protected $managerNum = 2;
    protected $quene;

    public function __construct()
    {
        $key = ftok(__DIR__, 'j');
        $this->quene = msg_get_queue($key);

        $this->create_manager();
        $this->create_worker();
        $this->monitor();
    }


    public function create_worker()
    {
        for ($i = 0; $i < $this->workerNum; $i++) {
            $process = new Swoole\Process(function () use ($i) {
                //子进程空间执行业务逻辑
                sleep(2);
                //抢占式，只有一个进程执行，如果获取完，其他进程获取不到
                msg_receive($this->quene, 0, $msg_type, 1024, $message, false); //会休眠阻塞
                //获取进程pid
                echo getmypid() . '正在执行任务' . $message . PHP_EOL;
            });
            $process->start();  //自动进程，创建进程
        }
    }


    //创建主进程
    public function create_manager()
    {
        for ($i = 0; $i < $this->managerNum; $i++) {
            $process = new Swoole\Process(function () use ($i) {
                //tpc方式接收投递的任务

                //

                //子进程空间执行业务逻辑
                sleep(2);
                msg_send($this->quene, 1, '子进程给你发消息了', false, false);

                //获取进程pid
                echo getmypid() . '正在执行任务' . $i . PHP_EOL;
            });
            $process->start();  //自动进程，创建进程
        }
    }

    //监听结果
    public function monitor()
    {
        //正常回收子进程
        while ($ret = Swoole\Process::wait(true)) {
            var_dump($ret);
        }
    }
}

new Job();